package com.atuguigu.flink.Day04;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;

//实时热门商品
public class Example1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据
        SingleOutputStreamOperator<UserBehavior> pvStream = env
                .readTextFile("E:\\data\\UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] arr = value.split(",");
                        return new UserBehavior(arr[0], arr[1], arr[2], arr[3], Long.parseLong(arr[4]) * 1000);
                    }
                })
                .filter(r -> r.behaviorType.equals("pv"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                                    @Override
                                    public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );
        // 先求出每件商品在每个窗口的访问量
        SingleOutputStreamOperator<ItemViewCount> ivcStream = pvStream.keyBy(r -> r.itemId)
                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
                .aggregate(new CountAgg(), new WindowResult());
        // 每一条支流都是同一个窗口中的不同商品的ItemViewCount次数
        KeyedStream<ItemViewCount, Long> ivcKeyedStream = ivcStream.keyBy(r -> r.windowEnd);

        ivcKeyedStream
                .process(new TopN(3))
                .print();


        env.execute();
    }

    public static class TopN extends KeyedProcessFunction<Long,ItemViewCount,String> {
        // 列表状态变量
        private ListState<ItemViewCount> itemViewCountListState;
        private Integer threshold;

        public TopN(Integer threshold) {
            this.threshold = threshold;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            getIterationRuntimeContext().getListState(
                    new ListStateDescriptor<ItemViewCount>("list-state", Types.POJO(ItemViewCount.class))
            );
        }

        @Override
        public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
            itemViewCountListState.add(value);// 添加到列表状态变量中
            // 水位线超过窗口结束时间+1毫秒时触发定时器来进行排序
            ctx.timerService().registerProcessingTimeTimer(value.windowEnd + 1);

        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 将数据从列表状态变量中取出
            ArrayList<ItemViewCount> itemViewCountArrayList = new ArrayList<>();
            for(ItemViewCount ivc:itemViewCountListState.get()){
                itemViewCountArrayList.add(ivc);
            }
            itemViewCountArrayList.clear();

            itemViewCountArrayList.sort(new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return o1.count.intValue() - o2.count.intValue();
                }
            });

            StringBuilder result = new StringBuilder();
            result
                    .append("===================================\n");
            for(int i=0;i<this.threshold;i++){
                ItemViewCount itemViewCount = itemViewCountArrayList.get(i);
                result
                        .append("浏览量No." + (i + 1) + " ")
                        .append("商品ID:" + itemViewCount.itemId + " ")
                        .append("浏览量:" + itemViewCount.count + " ")
                        .append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            }
            result.append("=====================================\n");
            out.collect(result.toString());
        }
    }

    //累加器
    public static  class CountAgg implements AggregateFunction<UserBehavior,Long ,Long> {


        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior value, Long accumulator) {
            return accumulator + 1L;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }




    public  static class  WindowResult extends ProcessWindowFunction<Long,ItemViewCount,String, TimeWindow> {

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<ItemViewCount> out) throws Exception {
            out.collect(new ItemViewCount(s,elements.iterator().next(),context.window().getStart(),context.window().getEnd()));
        }
    }

    public static class ItemViewCount{
        public String itemId;
        public Long count;
        public Long windowStart;
        public Long windowEnd;

        public ItemViewCount() {
        }

        public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {
            this.itemId = itemId;
            this.count = count;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
        }

        @Override
        public String toString() {
            return "ItemViewCount{" +
                    "itemId='" + itemId + '\'' +
                    ", count=" + count +
                    ", windowStart=" + new Timestamp(windowStart) +
                    ", windowEnd=" + new Timestamp(windowEnd) +
                    '}';
        }
    }
    //pojo类
    public  static  class  UserBehavior{
        public String userId; //用户id
        public String itemId;//商品id
        public String catagoryId;//类型id
        public String behaviorType;//行为类型
        public Long timestamp;//时间戳

        public UserBehavior() {
        }

        public UserBehavior(String userId, String itemId, String catagoryId, String behaviorType, Long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.catagoryId = catagoryId;
            this.behaviorType = behaviorType;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "UserBehivior{" +
                    "userId='" + userId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", catagoryId='" + catagoryId + '\'' +
                    ", behaviorType='" + behaviorType + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}
